/*******************************************************************************
 * Package: com.song.batch.config
 * Type:    BatchConfig
 * Date:    2024-01-19 21:47
 *
 * Copyright (c) 2024 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.batch.config;


import com.song.batch.CsvBeanValidator;
import com.song.batch.CsvJobListener;
import com.song.batch.CsvLineMapper;
import com.song.batch.CsvValidatingItemProcessor;
import com.song.batch.entity.ProjectUser;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.validator.Validator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.io.IOException;

/**
 * 功能描述：
 *
 * @author Songxianyang
 * @date 2024-01-19 21:47
 */
@Configuration
public class BatchConfig {
    /**
     * JobRepository 用户注册Job的容器
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
            throws Exception {
        // jobRepository的定义需要dataSource和transactionManager，Spring Boot已为我们自动配置了这两个类，Spring可通过方法注入已有的Bean。
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    /**
     * JobLauncher：用来启动Job的接口
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }


    /**
     * Job：我们要实际执行的任务，包含一个或多个Step
     *
     * @param jobBuilderFactory
     * @param step
     * @return
     */
    @Bean
    public Job importJob(JobBuilderFactory jobBuilderFactory, Step step) {
        return jobBuilderFactory
                //定义一个job名称(任意） 不是bean名称
                .get("myImportJob")
                //在每次运行该 Job 时，将自动生成一个唯一的 JobInstance 标识符
                //具体可以查看 batch_job_instance
                .incrementer(new RunIdIncrementer())
                // 可以多个 step
                .flow(step)
                .end()
                .listener(csvJobListener())
                .build();
    }


    /**
     * Step：Step-步骤包含ItemReader，ItemProcessor和ItemWriter
     * 看表： batch_step_execution
     * @param stepBuilderFactory
     * @param reader
     * @param writer
     * @param processor
     * @return
     */
    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory,
                      ItemReader<ProjectUser> reader,
                      ItemWriter<ProjectUser> writer,
                      ItemProcessor<ProjectUser, ProjectUser> processor) {
        // 1.批处理每次提交1000条数据。 step1 可以是具体的业务名称的step
        return stepBuilderFactory.get("step1").<ProjectUser, ProjectUser>chunk(1000)
                // 2.给step绑定reader
                .reader(reader)
                // 3.给step绑定processor
                .processor(processor)
                // 4.给step绑定writer
                .writer(writer)
                .build();
    }


    /**
     * ItemReader：用来读取数据的接口
     *
     * @return
     * @throws Exception
     */
    @Bean
    public ItemReader<ProjectUser> reader() throws IOException {
        // 使用FlatFileItemReader读取文件
        FlatFileItemReader<ProjectUser> reader = new FlatFileItemReader<>();

        // 使用FlatFileItemReader的setResource方法设置csv文件的路径
        reader.setResource(new ClassPathResource("/project_user.csv"));

        // 在此处对cvs文件的数据和领域模型类做对应映射
        reader.setLineMapper(csvLineMapper());
        return reader;
    }

    /**
     * ItemProcessor：用来处理数据的接口
     *
     * @return
     */
    @Bean
    public ItemProcessor<ProjectUser, ProjectUser> processor() {
        // 使用我们自己定义的ItemProcessor的实现CsvItemProcessor。
        CsvValidatingItemProcessor processor = new CsvValidatingItemProcessor();
        // 为processor指定校验器为CsvBeanValidator；
        processor.setValidator(csvBeanValidator());
        return processor;
    }

    /**
     * ItemWriter：用来输出数据的接口
     *
     * @param dataSource
     * @return
     */
    @Bean
    public ItemWriter<ProjectUser> writer(DataSource dataSource) {
        // 我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库。
        JdbcBatchItemWriter<ProjectUser> writer = new JdbcBatchItemWriter<>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        String sql = "insert into project_user (name,age,sex,address,birthday) values (:name, :age, :sex, :address, :birthday)";
        // 在此设置要执行批处理的SQL语句。
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }


    /**
     * 自定义监听器
     *
     * @return
     */
    @Bean
    public CsvJobListener csvJobListener() {
        return new CsvJobListener();
    }


    /**
     * 自定义校验器
     *
     * @return
     */
    @Bean
    public Validator<ProjectUser> csvBeanValidator() {
        return new CsvBeanValidator<>();
    }

    /**
     * 自定义lineMapper
     *
     * @return
     */
    @Bean
    public CsvLineMapper csvLineMapper() {
        return new CsvLineMapper();
    }


}
